Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2205][SPARK-7871][SQL]Advoid redundancy exchange #6682

Closed
wants to merge 3 commits into from

Conversation

jeanlyn
Copy link
Contributor

@jeanlyn jeanlyn commented Jun 6, 2015

When only use the output partitioning of BinaryNode will probably add unnecessary Exchange like multiway join.
This PR add meetPartitions to SparkPlan advoid redundancy exchanges by use to save the partitioning of the node and the child,and will be reset to node partitioning when need shuffle.

@jeanlyn
Copy link
Contributor Author

jeanlyn commented Jun 6, 2015

cc @yhuai @chenghao-intel

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@yhuai
Copy link
Contributor

yhuai commented Jun 6, 2015

@jeanlyn Thanks for working on it. However, I am not sure this is the change we want for long term. To fundamentally address this issue, we need to consider what attributes are equivalent (actually I am planning to do it by myself). I am not sure what meetPartitions does. What is the meaning of it? Why do we need to introduce another concept about data properties (we already have outputPartitioning and outputOrdering)?

@jeanlyn
Copy link
Contributor Author

jeanlyn commented Jun 7, 2015

@yhuai ,Thanks for comment.In the current implementation of join(BinaryNode) in master just simply use the one side partitioning as its partitioning to judge whether need shuffle and ignore the other side partitioning which already partition.This may cause unnecessary shuffle on multiway join.For example:

table a(key string,value string)
table b(key string,value string)
table c(key string,value string)
table d(key string,value string)
table e(key string,value string)

select a.value,b.value,c.value,d.value,e.value from
a join b 
on a.key = b.key
join c
on a.key = c.key
join d
on b.key = d.key
join e
on c.key = e.key

we got

Project [value#63,value#65,value#67,value#69,value#71]
 ShuffledHashJoin [key#66], [key#70], BuildRight
  Exchange (HashPartitioning [key#66], 200)
   Project [value#63,key#66,value#67,value#65,value#69]
    ShuffledHashJoin [key#64], [key#68], BuildRight
     Exchange (HashPartitioning [key#64], 200)
      Project [value#63,key#66,key#64,value#67,value#65]
       ShuffledHashJoin [key#62], [key#66], BuildRight
        ShuffledHashJoin [key#62], [key#64], BuildRight
         Exchange (HashPartitioning [key#62], 200)
          HiveTableScan [key#62,value#63], (MetastoreRelation default, a, None), None
         Exchange (HashPartitioning [key#64], 200)
          HiveTableScan [key#64,value#65], (MetastoreRelation default, b, None), None
        Exchange (HashPartitioning [key#66], 200)
         HiveTableScan [key#66,value#67], (MetastoreRelation default, c, None), None
     Exchange (HashPartitioning [key#68], 200)
      HiveTableScan [key#68,value#69], (MetastoreRelation default, d, None),

But actually
we just need

Project [value#59,value#61,value#63,value#65,value#67]
 ShuffledHashJoin [key#62], [key#66], BuildRight
  Project [value#63,value#61,value#65,value#59,key#62]
   ShuffledHashJoin [key#60], [key#64], BuildRight
    Project [value#63,value#61,key#60,value#59,key#62]
     ShuffledHashJoin [key#58], [key#62], BuildRight
      ShuffledHashJoin [key#58], [key#60], BuildRight
       Exchange (HashPartitioning 200)
        HiveTableScan [key#58,value#59], (MetastoreRelation default, a, None), None
       Exchange (HashPartitioning 200)
        HiveTableScan [key#60,value#61], (MetastoreRelation default, b, None), None
      Exchange (HashPartitioning 200)
       HiveTableScan [key#62,value#63], (MetastoreRelation default, c, None), None
    Exchange (HashPartitioning 200)
     HiveTableScan [key#64,value#65], (MetastoreRelation default, d, None), None
  Exchange (HashPartitioning 200)
   HiveTableScan [key#66,value#67], (MetastoreRelation default, e, None), None

This will greatly improve the efficiency,especially for the outer join. We had some real world cases of multiway full outer join with the same key,it produce a lot of null key(causing data skew,while hive doesn't) with redundancy shuffle and ran OOM finally.
I want to try using the meetPartitions,we can save the both side outputPartitioning of the BinaryNode and the itself outputPartitioning(redundancy?) when constructing the plan tree to achieve this easily,and the meetPartitions will be reset to the node outputPartitioning when need shuffle to avoid removing the indeed Exchange

@yhuai
Copy link
Contributor

yhuai commented Jun 7, 2015

Thanks for the example.

For your query, if you write it as

select a.value,b.value,c.value,d.value,e.value from
a join b 
on a.key = b.key
join c
on a.key = c.key
join d
on a.key = d.key
join e
on a.key = e.key

You will find those unnecessary exchange operators are gone (I am just pointing out a workaround. I am not questioning the issue at all.).

I mean we really need have the notion that all of key columns of tables in the above example are equivalent. I am not sure meetPartitions is the clean solution for this problem.

For the case of full outer joins, I guess you meant all records from all tables with a null key are shuffled to the same reducer, right? btw, what is the plan generated by Hive?

@jeanlyn
Copy link
Contributor Author

jeanlyn commented Jun 8, 2015

@yhuai .Yes,the full outer join cases shuffled the null key to the same reducer in spark-sql ,and the hive plan generated like:

explain select a.value,b.value,c.value,d.value from
a full outer join b 
on a.key = b.key
full outer join c
on a.key = c.key
full outer join d
on a.key = d.key


STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Reduce Output Operator
              key expressions: key (type: string)
              sort order: +
              Map-reduce partition columns: key (type: string)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: value (type: string)
          TableScan
            alias: b
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Reduce Output Operator
              key expressions: key (type: string)
              sort order: +
              Map-reduce partition columns: key (type: string)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: value (type: string)
          TableScan
            alias: c
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Reduce Output Operator
              key expressions: key (type: string)
              sort order: +
              Map-reduce partition columns: key (type: string)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: value (type: string)
          TableScan
            alias: d
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Reduce Output Operator
              key expressions: key (type: string)
              sort order: +
              Map-reduce partition columns: key (type: string)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: value (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Outer Join 0 to 1
               Outer Join 0 to 2
               Outer Join 0 to 3
          keys:
            0 key (type: string)
            1 key (type: string)
            2 key (type: string)
            3 key (type: string)
          outputColumnNames: _col1, _col6, _col11, _col16
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Select Operator
            expressions: _col1 (type: string), _col6 (type: string), _col11 (type: string), _col16 (type: string)
            outputColumnNames: _col0, _col1, _col2, _col3
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

@chenghao-intel has a solution in #6413

@jeanlyn jeanlyn closed this Jun 12, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants